package com.dec.kks.etl;

import org.apache.commons.lang3.math.NumberUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

import java.io.Serializable;

public class KylinConnectLogSort implements Serializable {

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "/home/hdfs/bigdata/hadoop-2.7.4");
        SparkConf conf = new SparkConf()
                .setAppName(KylinConnectLogSort.class.getName())
                .setMaster("local");

        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> rdd = jsc.textFile("/home/hdfs/soft/IdeaProjects/dec-kks-etl/data/netstat20180903.txt");
        JavaRDD<String> rdd1 = rdd.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                if (NumberUtils.isDigits(s)) {
                    return true;
                }
                return false;
            }
        });

        JavaRDD<Integer> rdd2 = rdd1.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return Integer.valueOf(s);
            }
        }).distinct(2);

        rdd2.sortBy(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer;
            }
        }, false, 1)
                .foreach(new VoidFunction<Integer>() {
                    @Override
                    public void call(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

    }

}
